/*******************************************************************************
 * Package: com.song.rabbit.mq.web
 * Type:    MqController
 * Date:    2023-05-14 18:52
 *
 * Copyright (c) 2023 HUANENG GUICHENG TRUST CORP.,LTD All Rights Reserved.
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.rocket;

import cn.hutool.log.Log;
import com.song.common.constants.MQConstant;
import com.song.common.util.JsonUtils;
import com.song.rocket.dto.DogDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * 功能描述：
 *
 * @author Songxianyang
 * @date 2023-05-14 18:52
 */
@Slf4j
@RestController
@RequestMapping("rocket-producer")
public class Controller {

    @Resource
    private RocketMQTemplate rocketMQTemplate;



    @GetMapping("send")
    public String send(@RequestParam String id) {
        rocketMQTemplate.convertAndSend(MQConstant.USER_TOPIC+":tag1","发送信息到rocket:::"+id);
        return "success";
    }
    // 简单发送消息
    /**
     * 单向消息:::sendOneWay
     * 指发送消息后，不需要等待Broker的响应，直接返回。
     * 这种方式适用于不需要关注消息发送结果的场景，如日志记录、统计信息等
     * @param id
     * @return
     */
    @GetMapping("item")
    public String item(@RequestParam String id) {
        Message<String> message = MessageBuilder.withPayload(JsonUtils.toJson(new DogDto(id, "天天编号" + id)))
                .build();
        rocketMQTemplate.sendOneWay(MQConstant.DOG_TOPIC,message);
        return "success";
    }

    /**
     * 同步发送消息   syncSend   消费者并发消费
     syncSend方法会阻塞当前线程，直到消息发送完成并收到了消息服务器的响应。
     如果消息发送成功，syncSend方法会返回一个SendResult对象，包含了消息的发送状态、消息ID等信息。
     如果消息发送失败，syncSend方法会抛出一个MessagingException异常。
     * @return
     */
    @GetMapping("item-concurrent-sync-send")
    public String itemConcurrent() {
        for (int i = 0, size = 100; i < size; i++) {
            Message<String> message = MessageBuilder.withPayload(JsonUtils.toJson(new DogDto(String.valueOf(i), "天天编号" + i)))
                    .build();
            log.info("获取发送的消息：>>>>>>{}",JsonUtils.toJson(new DogDto(String.valueOf(i), "天天编号" + i)));
            SendResult sendResult = rocketMQTemplate.syncSend(MQConstant.DOG_TOPIC, message);
            log.info("获取发送之后的消息状态：>>>>>>{}",JsonUtils.toJson(sendResult.getSendStatus()));
        }

        return "success";
    }


    /**
     * 异步发送消息   asyncSend   消费者顺序消费
     asyncSend方法不会阻塞当前线程，而是在另 一个线程中 异步发送消息。
     因此，asyncSend方法会立即返回，不会等待消息发送完成。
     如果需要等待消息发送完成并处理发送结果，可以使用SendCallback回调接口。
     * @return
     */
    @GetMapping("concurrent-a-sync-send")
    public String concurrentAsyncSend() {
        for (int i = 0, size = 100; i < size; i++) {
            Message<String> message = MessageBuilder.withPayload(JsonUtils.toJson(new DogDto(String.valueOf(i), "天天编号" + i)))
                    .build();
            log.info("获取发送的消息：>>>>>>{}",JsonUtils.toJson(new DogDto(String.valueOf(i), "天天编号" + i)));

            rocketMQTemplate.asyncSend(MQConstant.DOG_TOPIC, message, new SendCallback() {
                // 消息是否成功发送到rocket
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("获取发送成功后的消息状态：>>>>>>{}",sendResult.getSendStatus());
                    log.info("获取message：>>>>>>{}",message.getPayload());
                    log.info("------------");
                }
                //有异常
                @Override
                public void onException(Throwable throwable) {
                    log.info("消息发送到rocket失败：>>>>>>{}", throwable.getMessage());
                }
            });
        }

        return "success";
    }
    // 顺序发送消息

    /**
     * 单向顺序消息  sendOneWayOrderly
     * 支持消费者按照发送消息的先后顺序获取消息，从而实现业务场景中的顺序处理。
     * 相比其他类型消息，顺序消息在发送、存储和投递的处理过程中，更多强调多条消息间的先后顺序关系。
     * @return
     */
    @GetMapping("send-OneWay-Orderly")
    public String sendOneWayOrderly() {
        for (int i = 0, size = 100; i < size; i++) {
            Message<String> message = MessageBuilder.withPayload(JsonUtils.toJson(new DogDto(String.valueOf(i), "天天编号" + i)))
                    .build();
            log.info("获取发送的消息：>>>>>>{}",JsonUtils.toJson(new DogDto(String.valueOf(i), "天天编号" + i)));
            rocketMQTemplate.sendOneWayOrderly(MQConstant.DOG_TOPIC, message,"dog-constant");
        }
        return "success";
    }

    @GetMapping("test")
    public String test() {
        return "success";
    }
}
